package com.test.rocketmq.listener;

import com.test.rocketmq.constant.TxProducerGroupConstant;
import com.test.rocketmq.domain.OrderPaidEvent;
import com.test.rocketmq.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;

/**
 * @Author: huangkunming
 * @Date: 2021/05/05 16:32
 * @Description: 事务消息监听
 */
@Slf4j
@RocketMQTransactionListener(txProducerGroup = TxProducerGroupConstant.ORDER_TX_PRODUCER_GROUP)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;

    /**
     * 执行本地的事务逻辑
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            OrderPaidEvent payload = (OrderPaidEvent) msg.getPayload();
            Boolean result = orderService.createOrder(payload);
            if (result) {
                // 本地事务执行成功，确认发消息给消费者
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                // 本地事务执行失败，消息回滚
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("本地事务执行失败：如数据库异常，或SQL异常，或其他未知异常，异常原因:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 执行事务回查逻辑
     * 6s 本地事务还没执行完成，就会触发回调检查的方法
     *
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        OrderPaidEvent payload = (OrderPaidEvent) msg.getPayload();
        Boolean isExist = orderService.checkOrder(payload.getOrderNo());
        if (isExist) {
            // 事务已正确提交，确认发送消息给消费者
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            // 事务未提交，需要继续回查
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    /**
     * RocketMQLocalTransactionState说明：
     *
     * COMMIT：
     *         表示事务消息被提交，会被正确分发给消费者。(事务消息先会发送到broker，对消费端不可见，为UNKNOWN状态，在这里回调的时候如果返回COMMIT
     *         那么。消费端马上就会接收到消息。)
     *
     * ROLLBACK：
     *          该状态表示该事务消息被回滚，因为本地事务逻辑执行失败导致（如数据库异常，或SQL异常，或其他未知异常，这时候消息在broker会被删除掉，
     *          不会发送到consumer）
     * UNKNOWN：
     *        表示事务消息未确定，可能是业务方执行本地事务逻辑时间耗时过长或者网络原因等引起的，该状态会导致broker对事务消息进行回查，默认回查
     *        总次数是15次，第一次回查间隔时间是6s，后续每次间隔60s,（消息一旦发送状态就为中间状态：UNKNOWN）
     */
}